import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.integrate import quad
from scipy.optimize import minimize
from datetime import datetime as dt
from eod import EodHistoricalData
from nelson_siegel_svensson import NelsonSiegelSvenssonCurve
from nelson_siegel_svensson.calibrate import calibrate_nss_ols, calibrate_ns_ols
import matplotlib
# import matplotlib.pyplot as plt
# from mpl_toolkits.mplot3d import axes3d
from matplotlib.backends.backend_pgf import FigureCanvasPgf
matplotlib.backend_bases.register_backend('pdf', FigureCanvasPgf)
matplotlib.rcParams.update({
"pgf.texsystem": "pdflatex",
'font.family': 'serif',
'text.usetex': True,
'font.size': 14,
'legend.fontsize': 13,
'pgf.rcfonts': False
})
from scipy.stats import norm
import scipy.integrate as integrate
from scipy.special import iv
def blackScholes(r, S, K, T, sigma, type="c"):
d1 = (np.log(S/K) + (r + sigma**2/2)*T)/(sigma*np.sqrt(T))
d2 = d1 - sigma*np.sqrt(T)
if type == "c":
price = S*norm.cdf(d1, 0, 1) - K*np.exp(-r*T)*norm.cdf(d2, 0, 1)
elif type == "p":
price = K*np.exp(-r*T)*norm.cdf(-d2, 0, 1) - S*norm.cdf(-d1, 0, 1)
return price
def switchingCall(X0,K,T,r,s0,s1,l0,l1,pi=-1):
if pi == -1:
pi = l1/(l0+l1)
def s(x, T):
return s0**2 * x + s1**2 * (T-x)
def g0(x, T):
return np.sqrt(l0*l1*x/(T-x))
def g1(x, T):
return np.sqrt(l0*l1*(T-x)/x)
def h(x, T):
return np.sqrt(l0*l1*x*(T-x))
def f0(x, T):
return np.exp(-l0 * x - l1 * (T-x))*(g0(x,T)*iv(1, 2*h(x,T)) + l0 * iv(0, 2*h(x,T)))
def f1(x,T):
return np.exp(-l0 * x - l1 * (T-x))*(g1(x,T)*iv(1, 2*h(x,T)) + l1 * iv(0, 2*h(x,T)))
def integrand0(x, K, T):
return blackScholes(r, X0, K, T, np.sqrt(s(x,T)/T)) * f0(x, T)
def integrand1(x, K, T):
return blackScholes(r, X0, K, T, np.sqrt(s(x,T)/T)) * f1(x, T)
def C0(K, T):
return blackScholes(r, X0, K, T, s0)*np.exp(-l0*T) + integrate.quad(integrand0, 0, T, args=(K,T))[0]
def C1(K, T):
return blackScholes(r, X0, K, T, s1)*np.exp(-l0*T) + integrate.quad(integrand1, 0, T, args=(K,T))[0]
# print(C0(K,T), C1(K,T))
return pi * C0(K,T) + (1-pi) * C1(K,T)
yield_maturities = np.array([1/12, 2/12, 3/12, 4/12, 6/12, 1, 2, 3, 5, 7, 10, 20, 30])
yields = np.array([5.51,5.56,5.56,5.60,5.49,5.43,5.02,4.72,4.45,4.41,4.33,4.59,4.42]).astype(float)/100
#NSS model calibrate
curve_fit, status = calibrate_nss_ols(yield_maturities,yields)
curve_fit
t = np.linspace(0, 30, 100)
plt.plot(t, curve_fit(t)*100, color='black', label='Nelson-Siegel-Svensson Model')
plt.scatter(yield_maturities, yields*100, facecolors='none', edgecolors='black', label='Treasury Par Yield Curve Rate')
plt.grid(linewidth = 0.5)
plt.legend()
plt.ylabel("Rate (\%)")
plt.xlabel("Tenor (Years)")
# plt.savefig("yield.pgf")
curve_fit, status = calibrate_ns_ols(yield_maturities,yields)
print(curve_fit)
t = np.linspace(0, 30, 100)
plt.plot(t, curve_fit(t)*100, color='black', label='Nelson-Siegel Model')
plt.scatter(yield_maturities, yields*100, facecolors='none', edgecolors='black', label='Treasury Par Yield Curve Rate')
plt.grid(linewidth = 0.5)
plt.legend()
plt.ylabel("Rate (\%)")
plt.xlabel("Tenor (Years)")
# plt.savefig("yield2.pgf")
import yfinance as yf
def option_chains(ticker):
asset = yf.Ticker(ticker)
# print(asset)
expirations = asset.options
# print(expirations[:10])
market_prices = {}
for expiration in expirations:
opt = asset.option_chain(expiration)
chain = opt.calls
# print(chain)
expiration = pd.to_datetime(expiration) + pd.DateOffset(hours=23, minutes=59, seconds=59)
market_prices[expiration] = {}
market_prices[expiration]['strike'] = chain['strike'].tolist()
# market_prices[expiration]['price'] = (chain['bid']/2 + chain['ask']/2).tolist()
market_prices[expiration]['price'] = chain['lastPrice'].tolist()
return market_prices
options = option_chains("SPY")
# fig = plt.figure()
# ax = fig.add_subplot(projection='3d')
# for i,v in options.items():
# ax.scatter(v['maturity'], v['strike'], v['lastPrice'])
all_strikes = [v['strike'] for i,v in options.items()]
common_strikes = set.intersection(*map(set,all_strikes))
# common_strikes.update([420,430,480,490])
print('Number of common strikes:', len(common_strikes))
common_strikes = sorted(common_strikes)
prices = []
maturities = []
for date, v in options.items():
maturities.append((date - dt.today()).days/365.25)
price = [v['price'][i] for i,x in enumerate(v['strike']) if x in common_strikes]
prices.append(price)
price_arr = np.array(prices, dtype=object)
np.shape(price_arr)
volSurface = pd.DataFrame(price_arr, index = maturities, columns = common_strikes)
volSurface = volSurface.iloc[(volSurface.index > 0.04) & (volSurface.index <= 1)]
volSurface
def convert_prices_to_chain(prices):
output = pd.DataFrame()
for expiry, pair in prices.items():
# print(expiry, pair)
for strike, price in zip(pair['strike'], pair['price']):
output.loc[((expiry - dt.today()).days/365.25), strike] = price
output = output.reindex(sorted(output.columns), axis=1)
return output
volSurface = convert_prices_to_chain(options)
volSurface = volSurface.loc[:, (volSurface.columns >= 400.0) & (volSurface.columns <= 500.0)]
volSurface = volSurface.iloc[(volSurface.index > 0.2)]
volSurface
print(volSurface.columns.values)
S0 = 445
from scipy.stats import norm
N = norm.cdf
def bs_call(S, K, T, r, vol):
d1 = (np.log(S/K) + (r + 0.5*vol**2)*T) / (vol*np.sqrt(T))
d2 = d1 - vol * np.sqrt(T)
return S * norm.cdf(d1) - np.exp(-r * T) * K * norm.cdf(d2)
def bs_vega(S, K, T, r, sigma):
d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
return S * norm.pdf(d1) * np.sqrt(T)
def find_vol(target_value, S, K, T, r, *args):
MAX_ITERATIONS = 200
PRECISION = 1.0e-5
sigma = 0.5
for i in range(0, MAX_ITERATIONS):
price = bs_call(S, K, T, r, sigma)
vega = bs_vega(S, K, T, r, sigma)
diff = target_value - price # our root
if (abs(diff) < PRECISION).all() :
return sigma
sigma = sigma + diff/vega # f(x) / f'(x)
return sigma # value wasn't found, return best guess so far
vect = np.vectorize(find_vol)
# Convert our vol surface to dataframe for each option price with parameters
volSurfaceLong = volSurface.melt(ignore_index=False).dropna().reset_index()
volSurfaceLong.columns = ['maturity', 'strike', 'price']
# Calculate the risk free rate for each maturity using the fitted yield curve
volSurfaceLong['rate'] = volSurfaceLong['maturity'].apply(curve_fit)
# volSurfaceLong['marketIV'] = volSurfaceLong.apply(lambda x: vect(x.price, S0, x.strike, x.maturity, x.rate), axis=1)
volSurfaceLong
S0 = 428.8
r = volSurfaceLong['rate'].to_numpy('float')
K = volSurfaceLong['strike'].to_numpy('float')
tau = volSurfaceLong['maturity'].to_numpy('float')
P = volSurfaceLong['price'].to_numpy('float')
params = {"sig0" : {"x0": 0.16, "lbub": [1e-2,1]},
"sig1" : {"x0": 0.07, "lbub": [1e-2,1]},
"l0" : {"x0": 0.76, "lbub": [1e-2,5]},
"l1" : {"x0": 1.97, "lbub": [1e-2,5]}
}
x0 = [param["x0"] for key, param in params.items()]
bnds = [param["lbub"] for key, param in params.items()]
# switchingCall(X0,K,T,r,s0,s1,l0,l1)
def callback(xk):
print(f"Iteration: {callback.nfev}")
print(f"Current solution: {xk}")
callback.nfev += 1
callback.nfev = 0
def SqErr(x):
sig0, sig1, l0, l1 = [param for param in x]
err = np.sum([ (P_i-switchingCall(S0, K_i, tau_i, r_i, sig0, sig1, l0, l1))**2 /len(P) \
for P_i, K_i, tau_i, r_i in zip(P, K, tau, r)])
pen = np.sum( [(x_i-x0_i)**2 for x_i, x0_i in zip(x, x0)] )
return err + pen
print("About to calibrate")
result = minimize(SqErr, x0, tol = 1e-5, method='L-BFGS-B', options={'maxiter': 1e4 }, bounds=bnds, callback=callback)
sig0, sig1, l0, l1 = [param for param in result.x]
sig0, sig1, l0, l1
# (0.10074434838728742, 0.15631085210777004, 0.807983501745533, 2.0304931134722355)
# About to calibrate
# Iteration: 0
# Current solution: [0.01 0.01 5. 0.01]
# Iteration: 1
# Current solution: [0.01 0.10952676 2.60712973 1.35982573]
# Iteration: 2
# Current solution: [0.01 0.19772305 2.45767573 1.42899444]
# Iteration: 3
# Current solution: [0.01 0.2280412 2.34995689 1.48091238]
# Iteration: 4
# Current solution: [0.01 0.22237417 2.29474563 1.51001524]
# Iteration: 5
# Current solution: [0.01 0.20246025 1.83737575 1.74714761]
# Iteration: 6
# Current solution: [0.01111231 0.20483723 1.63835345 1.84657843]
# Iteration: 7
# Current solution: [0.03534615 0.21478326 1.38540795 1.951807 ]
# Iteration: 8
# Current solution: [0.04896322 0.21736113 1.17978456 2.043398 ]
# Iteration: 9
# Current solution: [0.05589635 0.2177989 0.94020518 2.1540299 ]
# Iteration: 10
# Current solution: [0.06869119 0.22015591 0.79607144 2.21474713]
# Iteration: 11
# Current solution: [0.07962909 0.22192583 0.63965716 2.28260622]
# Iteration: 12
# Current solution: [0.08410475 0.22468342 0.55137829 2.29334664]
# Iteration: 13
# Current solution: [0.08545409 0.22915719 0.5144569 2.24198814]
# Iteration: 14
# Current solution: [0.08127998 0.24277412 0.43500788 2.06614759]
# Iteration: 15
# Current solution: [0.07894165 0.24418271 0.45237824 2.02970636]
# Iteration: 16
# Current solution: [0.0775697 0.24716911 0.43740063 1.98573046]
# sig0, sig1, l0, l1 = (0.07756969778166047, 0.247169113443559, 0.4374006282788692, 1.9857304635023587)
# sig0, sig1, l0, l1 = (0.07756969778166047, 0.247169113443559, 0.4374006282788692, 1.9857304635023587)
# sig0, sig1, l0, l1 = (0.07580256, 0.25515649, 0.54693518, 1.91751383)
vectorized_call = np.vectorize(switchingCall)
switch_prices = vectorized_call(S0, K, tau, r, sig0, sig1, l0, l1)
volSurfaceLong['switch_price'] = switch_prices
# import pickle
# with open('best_calibration.pkl', 'wb') as f:
# pickle.dump(volSurfaceLong, f)
# with open('best_calibration_final.pkl', 'rb') as f:
# volSurfaceLong = pickle.load(f)
import plotly.graph_objects as go
from plotly.graph_objs import Surface
from plotly.offline import iplot, init_notebook_mode
init_notebook_mode()
x = np.linspace(400, 500, 100)
y = np.linspace(0.1, 2.3, 23)
X, Y = np.meshgrid(x, y)
Z = vectorized_call(S0, X, Y, curve_fit(Y), sig0, sig1, l0, l1)
fig = go.Figure(data=[go.Surface(x=X, y=Y, z=Z)])
fig.add_scatter3d(x=volSurfaceLong.strike, y=volSurfaceLong.maturity, z=volSurfaceLong.price, mode='markers')
fig.update_layout(
title_text='Market Prices (Mesh) vs Calibrated Regime-switching Prices (Markers)',
scene = dict(yaxis_title='TIME (Years)',
xaxis_title='STRIKES (Pts)',
zaxis_title='INDEX OPTION PRICE (Pts)'),
height=800,
width=800
)
fig.show()
# print(list(volSurface.index.values), list(volSurface.columns.values))
x = np.linspace(400, 500, 50)
y = np.linspace(0.005, 2.3, 25)
X, Y = np.meshgrid(x, y)
# print(X.size,Y.size)
Z = vectorized_call(S0, X, Y, curve_fit(Y), sig0, sig1, l0, l1)
X.shape, Y.shape, Z.shape
Z[0]
plt.figure(figsize=(8, 6), dpi=80)
ax = plt.axes(projection='3d')
ax.plot_wireframe(X, Y, Z, color='black', cstride=5, rstride=2, label='Calibrated Price Surface')
ax.scatter(volSurfaceLong.strike, volSurfaceLong.maturity, volSurfaceLong.price, label='Market Prices')
ax.set_xlabel('Strike (K)')
ax.set_ylabel('Time to Maturity (T)')
ax.set_zlabel('Price')
ax.xaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
ax.yaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
ax.zaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
ax.legend()
# plt.savefig("fittedpricesurface.pgf",bbox_inches='tight')
volSurfaceLong
volSurfaceLong['implVol'] = vect(volSurfaceLong['switch_price'], S0, volSurfaceLong['strike'], \
volSurfaceLong['maturity'], volSurfaceLong['rate'])
volSurfaceLong['marketIV'] = vect(volSurfaceLong['price'], S0, volSurfaceLong['strike'], \
volSurfaceLong['maturity'], volSurfaceLong['rate'])
volSurfaceLong
find_vol(45.38, 445,400,0.0566,0.01369)
x = np.linspace(400, 500, 50)
y = np.linspace(0.1, 2.3, 25)
X,Y = np.meshgrid(x,y)
Z = vect(vectorized_call(S0, X, Y, curve_fit(Y), sig0, sig1, l0, l1), S0, X, Y, curve_fit(Y))
fig2 = go.Figure(data=[go.Surface(x=X, y=Y, z=Z, opacity=0.55)])
fig2.update_layout(
title_text='Implied Volatility',
scene = dict(xaxis_title='TIME (Years)',
yaxis_title='STRIKES (Pts)',
zaxis_title='Implied Volatility'),
height=800,
width=800
)
fig2.show()
plt.figure(figsize=(8, 6), dpi=80)
ax = plt.axes(projection='3d')
ax.plot_wireframe(X, Y, Z, color='black', cstride=5, rstride=2)
# ax.scatter(volSurfaceLong.strike, volSurfaceLong.maturity, volSurfaceLong.price)
ax.set_xlabel('Strike (K)')
ax.set_ylabel('Time to Maturity (T)')
ax.set_zlabel('Implied Volatility')
ax.xaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
ax.yaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
ax.zaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
# plt.savefig("fittedvolsurface.pgf", bbox_inches='tight')
volSurfaceLong['error'] = (volSurfaceLong['switch_price']-volSurfaceLong['price'])/volSurfaceLong['price']
volSurfaceLong
volSurfaceLong[volSurfaceLong.error > 100]
volSurfaceLong['IVerror'] = (volSurfaceLong['implVol']-volSurfaceLong['marketIV'])/volSurfaceLong['marketIV']
volSurfaceLong[volSurfaceLong.IVerror > 20]
import plotly.express as px
fig = px.scatter_3d(volSurfaceLong, x='maturity', y='strike', z='error')
fig.show()
plt.figure(figsize=(8, 6), dpi=80)
ax = plt.axes(projection='3d')
ax.scatter(volSurfaceLong.strike, volSurfaceLong.maturity, volSurfaceLong.error)
ax.set_xlabel('Strike (K)')
ax.set_ylabel('Time to Maturity (T)')
ax.set_zlabel('Error (\%)')
ax.xaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
ax.yaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
ax.zaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
ax.view_init(30, 120)
# plt.savefig("calibrationerror.pgf", bbox_inches='tight')
import plotly
plotly.offline.init_notebook_mode()